package com.atuguigu.flink.Day08;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

//处理时间窗口的平均值
public class Example4 {
    public static void main(String[] args) throws Exception {
        //获取流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //获取输入流
        SingleOutputStreamOperator<SendsorReading> stream = env.addSource(new SensorSource()).filter(r -> r.id.equals("sensor_1"));
        //获取表配置，表环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        //TODO 1 Table api
        Table table = tEnv.fromDataStream(
                stream, $("id"), $("temperture").as("temp"), $("timestamp").as("ts"),
                $("pt").proctime());//新建pt字段，用来储存数据到达机器的时间.
        Table tableResult = table
                .window(Tumble.over(lit(10).second()).on($("pt")).as("w"))
                .groupBy($("id"), $("w"))
                .select($("id"), $("temp").avg(), $("w").end());

        tEnv.toRetractStream(tableResult, Row.class);


        //TODO 2 SQL
        tEnv.createTemporaryView("sensor",
                stream,
                $("id"),
                $("temperture").as("temp"),
                $("timestamp").as("ts"),
                $("pt").proctime());


   Table sqlResult = tEnv.sqlQuery("SELECT id, AVG(temp), TUMBLE_START(pt, INTERVAL '10' SECOND), TUMBLE_END(pt, INTERVAL '10' SECOND) FROM sensor GROUP BY id, TUMBLE(pt, INTERVAL '10' SECOND)");
        tEnv.toRetractStream(sqlResult, Row.class).print();
        env.execute();
    }

}
